Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commit the session between writing and deletion of RTIF #42928

Merged
merged 3 commits into from
Oct 14, 2024

Conversation

ephraimbuddy
Copy link
Contributor

@ephraimbuddy ephraimbuddy commented Oct 11, 2024

Previously, this was how it was done, but now, a session was used for both the writing and deletion of RTIF without committing it, which we suspect caused StaleDataError. The related PR: #38565

This PR brings back the old behaviour of using different sessions for writing/deleting RTIFs

The ERROR:

[2024-10-08T14:58:00.817+0000] {taskinstance.py:3310} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 273, in _run_raw_task
    TaskInstance._execute_task_with_callbacks(
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 3122, in _execute_task_with_callbacks
    _update_rtif(ti=self, rendered_fields=rendered_fields)
  File "/usr/local/lib/python3.10/site-packages/airflow/api_internal/internal_api_call.py", line 139, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 97, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1642, in _update_rtif
    RenderedTaskInstanceFields.delete_old_records(ti.task_id, ti.dag_id, session=session)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 94, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/renderedtifields.py", line 271, in delete_old_records
    session.flush()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
    with util.safe_reraise():
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
    _emit_update_statements(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1035, in _emit_update_statements
    raise orm_exc.StaleDataError(
sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'rendered_task_instance_fields' expected to update 1 row(s); 0 were matched.

@dstandish
Copy link
Contributor

This PR brings back the old behaviour of using different sessions for writing/deleting RTIFs

I think has continued to use the same session. Pretty sure that the difference is pre-AIP-44 we committed in between.

Here's a way to explore:

session.execute("create table parent(id int);");
session.commit()
session.execute("insert into parent (id) values (1),(2),(3);")
with create_session() as session2:
    print(session2.execute("select * from parent").all())
assert session2 is session

Why this matters?

I feel like it's better to commit explicitly so that we see it. Rather than having it done invisibly by the decorator.

@ephraimbuddy
Copy link
Contributor Author

This PR brings back the old behaviour of using different sessions for writing/deleting RTIFs

I think has continued to use the same session. Pretty sure that the difference is pre-AIP-44 we committed in between.

Here's a way to explore:

session.execute("create table parent(id int);");
session.commit()
session.execute("insert into parent (id) values (1),(2),(3);")
with create_session() as session2:
    print(session2.execute("select * from parent").all())
assert session2 is session

Yeah, Issuing a commit persists things into the DB, not that the session is different.

Why this matters?

I feel like it's better to commit explicitly so that we see it. Rather than having it done invisibly by the decorator.

The provide session is already doing that? Or you want it removed from the RTIF write method? I feel it's a better design to have the provide_session do it than

RTIF.write(session)
session.commit()
RTIF.delete_old_record(session)
session.commit()

@ephraimbuddy ephraimbuddy changed the title Use different sessions in writing and deletion of RTIF Commit the session between writing and deletion of RTIF Oct 11, 2024
@dstandish
Copy link
Contributor

dstandish commented Oct 11, 2024

The provide session is already doing that? Or you want it removed from the RTIF write method?

It won't do it if you provide a session. So we don't necessarily have to remove the decorator, though doesn't seem a bad idea if we don't need it.

I feel it's a better design to have the provide_session do it than

Well, reasonable minds can differ. But what motivates me is "prefer explicit over implicit".

Do it as you wish, but I think when we let provide_session do the commits implicitly, it's much less obvious what's going on (and i think that's part of how we got into trouble here).

Commiting explicitly makes it very clear. It also affords an opportunity to explain why we commit there.

Anyway, most of this I think will be changed as part of AIP-72, so this i think is mainly academic, about us sort of just figuring out what's the best thing to do in this kind of scenario.

@ashb
Copy link
Member

ashb commented Oct 12, 2024

Rather than two commits you can also do session.flush() which makes SQLA issue the commands to the DB such that a future select on the same session sees the same rows, but it keeps it in the same session, such that the total operation is atomic still

@ephraimbuddy
Copy link
Contributor Author

Rather than two commits you can also do session.flush() which makes SQLA issue the commands to the DB such that a future select on the same session sees the same rows, but it keeps it in the same session, such that the total operation is atomic still

If I read this correctly, we would still have the issue: https://docs.sqlalchemy.org/en/20/orm/session_basics.html#flushing. Does this mean there was a flush in the update statement when we called write before the delete statement?

@ashb
Copy link
Member

ashb commented Oct 13, 2024

We aren't using 2.0 style though, so I think we have to call flush ourselves in 1.4/classic style

@ephraimbuddy ephraimbuddy force-pushed the fix-rtif-staledataerror branch 2 times, most recently from 1f9a384 to 1e3d543 Compare October 14, 2024 09:42
Previously, this was how it was done, but now,
a session was used for both the writing and deletion of RTIF,
which we suspect caused StaleDataError. The related PR: apache#38565

This PR brings back the old behaviour of using different sessions for writing/deleting RTIFs
@ephraimbuddy ephraimbuddy merged commit ced319f into apache:main Oct 14, 2024
51 checks passed
@ephraimbuddy ephraimbuddy deleted the fix-rtif-staledataerror branch October 14, 2024 16:00
ephraimbuddy added a commit to astronomer/airflow that referenced this pull request Oct 14, 2024
* FLush the session before deleting the RTIF data

Previously, this was how it was done, but now,
a session was used for both the writing and deletion of RTIF,
which we suspect caused StaleDataError. The related PR: apache#38565

This PR brings back the old behaviour of using different sessions for writing/deleting RTIFs

* fixup! Use different sessions in writing and deletion of RTIF

* add test and use flush

(cherry picked from commit ced319f)
ephraimbuddy added a commit that referenced this pull request Oct 14, 2024
* FLush the session before deleting the RTIF data

Previously, this was how it was done, but now,
a session was used for both the writing and deletion of RTIF,
which we suspect caused StaleDataError. The related PR: #38565

This PR brings back the old behaviour of using different sessions for writing/deleting RTIFs

* fixup! Use different sessions in writing and deletion of RTIF

* add test and use flush

(cherry picked from commit ced319f)
R7L208 pushed a commit to R7L208/airflow that referenced this pull request Oct 17, 2024
* Use different sessions in writing and deletion of RTIF

Previously, this was how it was done, but now,
a session was used for both the writing and deletion of RTIF,
which we suspect caused StaleDataError. The related PR: apache#38565

This PR brings back the old behaviour of using different sessions for writing/deleting RTIFs

* fixup! Use different sessions in writing and deletion of RTIF

* add test and use flush
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants